package chapter10;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author eric
 * Description: 报警代理
 */
public class AlarmAgent {

    /**
     * 是否已经连接上服务器
     * 默认没有连接上（false）
     */
    private volatile boolean connectedToServer = false;

    /**
     * 条件处理
     */
    Predicate agentConnected = () -> connectedToServer;

    /**
     * blocker对象
     */
    private Blocker blocker = new ConditionVarBlocker(false);


    /**
     * 初始化代理
     */
    public void init() {
        // <1.0> 连接报警服务器
        Thread connectToAlarmserver = new Thread(new ConnectedAlarmServerTask());
        // 开始连接
        connectToAlarmserver.start();

        // <2.0> 定时发送心跳校验是否连接成功
        ScheduledThreadPoolExecutor heatbeatExcutor = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {

            private AtomicInteger index = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread();
                thread.setName("heatbeat-thread-" + index);
                // 设置守护线程，当jvm退出的时候退出
                thread.setDaemon(true);
                return thread;
            }
        });

        // 5s 连接一次
        heatbeatExcutor.scheduleAtFixedRate(new HeartBeatTask(), 5000, 3000, TimeUnit.MILLISECONDS);
    }

    /**
     * 连接报警服务器的线程
     */
    class ConnectedAlarmServerTask implements Runnable {

        @Override
        public void run() {
            // 走socketChannel的方式和告警服务器建立一个连接
            // 这里我们就简单模拟一下10s后建立连接
            try {
                Thread.sleep(10 * 1000);
            } catch (
                    InterruptedException e) {
                e.printStackTrace();
            }

            // 连接建立完成
            System.out.println("alarm connected");

            onConnected();
        }
    }

    /**
     * 发送心跳的线程
     */
    class HeartBeatTask implements Runnable {

        @Override
        public void run() {
            // 通过socket来发送心跳
            // 若不通过， 则断开重连
            if (!testConnected()) {
                onDisconnected();
                reconnected();
            }
        }

    }

    /**
     * 验证是否连接成功吗
     *
     * @return
     */
    private boolean testConnected() {
        // 通过socket 发送心跳
        try {
            // 模拟发送时常
            Thread.sleep(50);
            return true;
        } catch (InterruptedException e) {
            System.out.println("heart beat fail...");
            return false;
        }
    }

    /**
     * 连接失败后调用
     */
    public void onDisconnected() {
        // 通过volatile的语义让其他线程读取到，其他线程上报报警消息是stateOperation不满足则阻塞
        connectedToServer = false;
    }

    /**
     * 确认和报警服务器建立链表
     */
    private void onConnected() {
        // 通过blocker去唤醒
        try {
            // 当连接上后，此时也许有线程在等待发送信息，那么需要唤醒等待的线程。
            blocker.signalAfter(new Callable<Boolean>() {
                @Override
                public Boolean call() throws Exception {
                    // 唤醒前的状态动作
                    // 修改连接报警服务器的状态
                    System.out.println("update connectedServer = true");
                    connectedToServer = true;
                    // 条件满足，执行唤醒
                    return Boolean.TRUE;
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 重新连接
     */
    private void reconnected() {
        ConnectedAlarmServerTask connectedAlarmServerTask = new ConnectedAlarmServerTask();
        // 直接通过心跳线程来重新连接，不用再开一个线程
        connectedAlarmServerTask.run();
    }

    /**
     * 上报报警信息给报警服务
     */
    public void sendAlarm(AlarmInfo alarmInfo) throws Exception {
        // 构建guardedAction
        GuardedAction<Void> guardedAction = new GuardedAction<Void>(agentConnected) {
            @Override
            public Void call() throws Exception {
                doSendAlarm(alarmInfo);
                return null;
            }
        };

        // 通过blocker执行目标
        blocker.callWithGuard(guardedAction);
    }


    /**
     * 发送报警信息给报警服务器
     *
     * @param alarmInfo 报警信息
     */
    private void doSendAlarm(AlarmInfo alarmInfo) {
        // 建立socket连接发送数据给报警信息
        System.out.println("start send alarm:" + alarmInfo);
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 模拟上报50ms
        System.out.println("end send alarm");
    }

}
